-
Notifications
You must be signed in to change notification settings - Fork 7.1k
[Data] Fix Union operator to avoid blocking when preserve order #59922
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[Data] Fix Union operator to avoid blocking when preserve order #59922
Conversation
Signed-off-by: You-Cheng Lin <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request fixes a blocking issue in the Union operator when preserve_order is enabled. The change introduces a streaming approach, flushing data from input operators as they complete, which is a good improvement. My review identifies a critical potential for deadlock because the flushing logic isn't triggered when an input stream finishes without sending a final block. I've also included a couple of suggestions to improve code readability and maintainability by reducing duplication and simplifying expressions. Addressing the critical issue is necessary to prevent hangs in the data processing pipeline.
python/ray/data/_internal/execution/operators/union_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/union_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/union_operator.py
Outdated
Show resolved
Hide resolved
Signed-off-by: You-Cheng Lin <[email protected]>
Signed-off-by: You-Cheng Lin <[email protected]>
Signed-off-by: You-Cheng Lin <[email protected]>
alexeykudinkin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@owenowenisme the problem is that we still we're exhausting 1 input before we move on to the next one, which means we're gonna be accumulating remaining outputs before producing.
Instead, please take a look how we achieve determinism in make_async_gen and apply the same technique here:
- Iterate over inputs in the same order
- Always deque 1 block and never skip an input!
- Once op completes you can start skipping it
python/ray/data/_internal/execution/operators/union_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/union_operator.py
Outdated
Show resolved
Hide resolved
|
@alexeykudinkin |
Signed-off-by: You-Cheng Lin <[email protected]>
Signed-off-by: You-Cheng Lin <[email protected]>
…ot_block_when_preserve_order_true
Signed-off-by: You-Cheng Lin <[email protected]>
python/ray/data/_internal/execution/operators/union_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/union_operator.py
Outdated
Show resolved
Hide resolved
Signed-off-by: You-Cheng Lin <[email protected]>
Signed-off-by: You-Cheng Lin <[email protected]>
e4e4007 to
da8d1b5
Compare
Description
Make the Union operator not blocking when
preserve_orderis enabled if_add_input_inneris called with the input in the front.